package org.ykx.demo.kafka

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kafka.KafkaUtils

import kafka.serializer.StringDecoder

object JLDataAnalytics {
  def main(args: Array[String]): Unit = {
    var masterUrl = "local[1]"
    if (args.length > 0) {
      masterUrl = args(0)
    }

    // Create a StreamingContext with the given master URL
    val conf = new SparkConf().setMaster(masterUrl).setAppName("testDATA")
    val ssc = new StreamingContext(conf, Seconds(1))

    // Kafka configurations
    val topics = Set("jiliang")
    val brokers = "10.10.61.193:6667,10.10.61.194:6667,10.10.61.195:6667"
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")

//    val dbIndex = 1
//    val clickHashKey = "app::users::click"

    // Create a direct stream
//    val numInputDStreams = 5 //消费者线程数量
//    val kafkaDStreams = (1 to numInputDStreams).map { _ => KafkaUtils.createStream(...) }
    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

//    val events = kafkaStream.flatMap(line => {Some(line.toString().split("||").toList)})
//    events.foreach { x => println("[DATA]: "+x.count()) }  
    kafkaStream.print()
    
    ssc.start()
    ssc.awaitTermination()
  }
  
}